/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package studio.raptor.ddal.core.transaction;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.io.Files;
import com.google.common.io.PatternFilenameFilter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import studio.raptor.ddal.common.exception.GenericException;
import studio.raptor.ddal.common.exception.code.CommonErrorCodes;
import studio.raptor.ddal.common.util.RuntimeUtil;
import studio.raptor.ddal.common.util.StringUtil;

/**
 * 事务恢复
 *
 * 1、事务恢复产生的日志与运行时的事务日志需做隔离，避免污染业务操作的事务日志。
 *
 * @author Sam
 * @since 3.0.0
 */
public class TransactionRecovery {
  private static Logger logger = LoggerFactory.getLogger(TransactionRecovery.class);
  private final Splitter lineSplitter = Splitter.on('|');
  private final String LOG_PATH;

  private Map<String, Transaction> unfinishedLogs = new HashMap<>(); // <txId, txLog>
  private Set<String> committedTxIds = new HashSet<>();

  public TransactionRecovery() {
    LOG_PATH = RuntimeUtil.getTransactionRecordLog();
  }

  /**
   * 文件list接口返回的列表默认已经排序，为了保险起见，再强制排序一次。排序之后
   * 的数组最后一个元素是ddal当前日志文件ddal.transaction.log
   *
   * 使用日志写入的顺序（n-2,n-3...,1,0,n-1）对日志文件进行处理
   */
  public void recover() {
    checkLogPath();
    String[] txFiles = listTxFiles();
    if (null == txFiles || txFiles.length == 0) {
      logger.info("No transaction log found under path of {}", LOG_PATH);
      return;
    }
    Arrays.sort(txFiles);
    for (int len = txFiles.length, i = len - 2; i >= 0; i--) {
      findOutUnfinishedTx(txFiles[i]);
    }
    findOutUnfinishedTx(txFiles[txFiles.length - 1]);
    if (unfinishedLogs.size() == 0) {
      logger.info("No unfinished transaction found, continuing to startup...");
    } else {
      logger.info("Found {} transaction(s), prepared to re-commit.", unfinishedLogs.size());
      logger.info("Unfinished transaction(s) list: \n{}", Joiner.on('\n').join(unfinishedLogs.values()));
    }
  }

  private void findOutUnfinishedTx(String file) {
    ByteArrayOutputStream lineStream = null;
    try {
      MappedByteBuffer mbb = Files.map(new File(LOG_PATH + File.separator + file), FileChannel.MapMode.READ_ONLY);
      lineStream = new ByteArrayOutputStream();
      int lineCount = 0;
      while (mbb.hasRemaining()) {
        byte b = mbb.get();
        if (b == '\n' && lineStream.size() > 0) {
          List<String> txfLineSplits = lineSplitter.splitToList(lineStream.toString("UTF-8"));
          if (txfLineSplits.size() > 3) {
            readTxLog(txfLineSplits);
          } else if (txfLineSplits.size() == 3) {
            readCommitLog(txfLineSplits);
          }
          // try to optimize log data during tx log reading
          if (++lineCount % 20 == 0) {
            optimizeUnfinishedTxLog();
          }
          lineStream.reset();
        } else {
          lineStream.write(b);
        }
      }
      // optimize finally
      optimizeUnfinishedTxLog();
    } catch (IOException ioe) {
      throw new GenericException(CommonErrorCodes.COMMON_508, new Object[]{file});
    } finally {
      try {
        if (null != lineStream) {
          lineStream.close();
        }
      } catch (IOException ignore) {
      }
    }
  }

  private void optimizeUnfinishedTxLog() {
    Iterator<Map.Entry<String, Transaction>> iterator = unfinishedLogs.entrySet().iterator();
    while (iterator.hasNext()) {
      Map.Entry<String, Transaction> entry = iterator.next();
      if (committedTxIds.contains(entry.getKey())) {
        committedTxIds.remove(entry.getKey());
        iterator.remove();
      }
    }
  }

  private void readTxLog(List<String> logLine) {
    String txId = logLine.get(4);
    Transaction tx = new Transaction(txId, Integer.parseInt(logLine.get(2)), logLine.get(3));
    for (int i = 5, len = logLine.size(); i < len; i++) {
      String sql;
      if (!StringUtil.isEmpty((sql = logLine.get(i)))) {
        tx.addSql(sql);
      }
    }
    if (null != unfinishedLogs.put(txId, tx)) {
      throw new GenericException(CommonErrorCodes.COMMON_510, new Object[]{txId});
    }
  }

  private void readCommitLog(List<String> commitLine) {
    if (!committedTxIds.add(commitLine.get(1))) {
      throw new GenericException(CommonErrorCodes.COMMON_511, new Object[]{commitLine.get(1)});
    }
  }

  private void checkLogPath() {
    if (StringUtil.isEmpty(LOG_PATH)) {
      throw new GenericException(CommonErrorCodes.COMMON_506, new Object[]{LOG_PATH});
    }
  }

  /**
   * 应用启动时不会有新增的log文件写入，总是对现有存量的日志文件进行操作。
   *
   * @return 日志文件列表
   */
  private String[] listTxFiles() {
    File txFilePath = new File(LOG_PATH);
    if (!txFilePath.exists() || !txFilePath.isDirectory()) {
      throw new GenericException(CommonErrorCodes.COMMON_507, new Object[]{LOG_PATH});
    }
    return txFilePath.list(new PatternFilenameFilter(".*ddal.transaction.*\\.log"));
  }

  static class Transaction {
    private String txId;
    private int dbTypeOrdinal;
    private String virtualDB;
    private List<String> orderedSqlList;

    Transaction(String txId, int dbTypeOrdinal, String virtualDB) {
      this.txId = txId;
      this.dbTypeOrdinal = dbTypeOrdinal;
      this.virtualDB = virtualDB;
      this.orderedSqlList = new ArrayList<>();
    }

    void addSql(String sql) {
      this.orderedSqlList.add(sql);
    }

    List<String> getSqlList() {
      return this.orderedSqlList;
    }

    public String getTxId() {
      return txId;
    }

    public int getDbTypeOrdinal() {
      return dbTypeOrdinal;
    }

    public String getVirtualDB() {
      return virtualDB;
    }

    @Override
    public String toString() {
      return "Transaction{" +
              "txId='" + txId + '\'' +
              ", dbTypeOrdinal=" + dbTypeOrdinal +
              ", virtualDB='" + virtualDB + '\'' +
              ", orderedSqlList=" + orderedSqlList +
              '}';
    }
  }
}
